// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.metric;

import org.apache.doris.catalog.CloudTabletStatMgr;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.monitor.jvm.JvmStats;
import org.apache.doris.monitor.jvm.JvmStats.GarbageCollector;
import org.apache.doris.monitor.jvm.JvmStats.MemoryPool;
import org.apache.doris.monitor.jvm.JvmStats.Threads;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.Snapshot;
import com.google.common.base.Joiner;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.stream.Collectors;

/*
 * Like this:
 * # HELP doris_fe_job_load_broker_cost_ms doris_fe_job_load_broker_cost_ms
 * # TYPE doris_fe_job_load_broker_cost_ms gauge
 * doris_fe_job{job="load", type="mini", state="pending"} 0
 */
public class PrometheusMetricVisitor extends MetricVisitor {

    private static final Logger logger = LogManager.getLogger(PrometheusMetricVisitor.class);
    // jvm
    private static final String JVM_HEAP_SIZE_BYTES = "jvm_heap_size_bytes";
    private static final String JVM_NON_HEAP_SIZE_BYTES = "jvm_non_heap_size_bytes";
    private static final String JVM_YOUNG_SIZE_BYTES = "jvm_young_size_bytes";
    private static final String JVM_OLD_SIZE_BYTES = "jvm_old_size_bytes";
    private static final String JVM_THREAD = "jvm_thread";

    private static final String JVM_GC = "jvm_gc";

    private static final String HELP = "# HELP ";
    private static final String TYPE = "# TYPE ";

    private Set<String> metricNames = new HashSet();

    public PrometheusMetricVisitor() {
        super();
    }

    @Override
    public void visitJvm(JvmStats jvmStats) {
        // heap
        sb.append(Joiner.on(" ").join(HELP, JVM_HEAP_SIZE_BYTES, "jvm heap stat\n"));
        sb.append(Joiner.on(" ").join(TYPE, JVM_HEAP_SIZE_BYTES, "gauge\n"));
        sb.append(JVM_HEAP_SIZE_BYTES).append("{type=\"max\"} ").append(jvmStats.getMem().getHeapMax().getBytes())
                .append("\n");
        sb.append(JVM_HEAP_SIZE_BYTES).append("{type=\"committed\"} ")
                .append(jvmStats.getMem().getHeapCommitted().getBytes()).append("\n");
        sb.append(JVM_HEAP_SIZE_BYTES).append("{type=\"used\"} ").append(jvmStats.getMem().getHeapUsed().getBytes())
                .append("\n");
        // non heap
        sb.append(Joiner.on(" ").join(HELP, JVM_NON_HEAP_SIZE_BYTES, "jvm non heap stat\n"));
        sb.append(Joiner.on(" ").join(TYPE, JVM_NON_HEAP_SIZE_BYTES, "gauge\n"));
        sb.append(JVM_NON_HEAP_SIZE_BYTES).append("{type=\"committed\"} ")
                .append(jvmStats.getMem().getNonHeapCommitted().getBytes()).append("\n");
        sb.append(JVM_NON_HEAP_SIZE_BYTES).append("{type=\"used\"} ")
                .append(jvmStats.getMem().getNonHeapUsed().getBytes()).append("\n");

        // mem pool
        Iterator<MemoryPool> memIter = jvmStats.getMem().iterator();
        while (memIter.hasNext()) {
            MemoryPool memPool = memIter.next();
            if (memPool.getName().equalsIgnoreCase("young")) {
                sb.append(Joiner.on(" ").join(HELP, JVM_YOUNG_SIZE_BYTES, "jvm young mem pool stat\n"));
                sb.append(Joiner.on(" ").join(TYPE, JVM_YOUNG_SIZE_BYTES, "gauge\n"));
                sb.append(JVM_YOUNG_SIZE_BYTES).append("{type=\"used\"} ")
                        .append(memPool.getUsed().getBytes()).append("\n");
                sb.append(JVM_YOUNG_SIZE_BYTES).append("{type=\"peak_used\"} ")
                        .append(memPool.getPeakUsed().getBytes()).append("\n");
                sb.append(JVM_YOUNG_SIZE_BYTES).append("{type=\"max\"} ")
                        .append(memPool.getMax().getBytes()).append("\n");
            } else if (memPool.getName().equalsIgnoreCase("old")) {
                sb.append(Joiner.on(" ").join(HELP, JVM_OLD_SIZE_BYTES, "jvm old mem pool stat\n"));
                sb.append(Joiner.on(" ").join(TYPE, JVM_OLD_SIZE_BYTES, "gauge\n"));
                sb.append(JVM_OLD_SIZE_BYTES).append("{type=\"used\"} ")
                        .append(memPool.getUsed().getBytes()).append("\n");
                sb.append(JVM_OLD_SIZE_BYTES).append("{type=\"peak_used\"} ")
                        .append(memPool.getPeakUsed().getBytes()).append("\n");
                sb.append(JVM_OLD_SIZE_BYTES).append("{type=\"max\"} "
                ).append(memPool.getMax().getBytes()).append("\n");
            }
        }

        // gc
        sb.append(Joiner.on(" ").join(HELP, JVM_GC, "jvm gc stat\n"));
        sb.append(Joiner.on(" ").join(TYPE, JVM_GC, "gauge\n"));
        for (GarbageCollector gc : jvmStats.getGc()) {
            sb.append(JVM_GC).append("{");
            sb.append("name=\"").append(gc.getName()).append(" Count").append("\", ").append("type=\"count\"} ")
                    .append(gc.getCollectionCount()).append("\n");
            sb.append(JVM_GC).append("{");
            sb.append("name=\"").append(gc.getName()).append(" Time").append("\", ").append("type=\"time\"} ")
                    .append(gc.getCollectionTime().getMillis()).append("\n");
        }

        // threads
        Threads threads = jvmStats.getThreads();
        sb.append(Joiner.on(" ").join(HELP, JVM_THREAD, "jvm thread stat\n"));
        sb.append(Joiner.on(" ").join(TYPE, JVM_THREAD, "gauge\n"));
        sb.append(JVM_THREAD).append("{type=\"count\"} ")
                .append(threads.getCount()).append("\n");
        sb.append(JVM_THREAD).append("{type=\"peak_count\"} ")
                .append(threads.getPeakCount()).append("\n");
        sb.append(JVM_THREAD).append("{type=\"new_count\"} ")
                .append(threads.getThreadsNewCount()).append("\n");
        sb.append(JVM_THREAD).append("{type=\"runnable_count\"} ")
                .append(threads.getThreadsRunnableCount()).append("\n");
        sb.append(JVM_THREAD).append("{type=\"blocked_count\"} ")
                .append(threads.getThreadsBlockedCount()).append("\n");
        sb.append(JVM_THREAD).append("{type=\"waiting_count\"} ")
                .append(threads.getThreadsWaitingCount()).append("\n");
        sb.append(JVM_THREAD).append("{type=\"timed_waiting_count\"} ")
                .append(threads.getThreadsTimedWaitingCount()).append("\n");
        sb.append(JVM_THREAD).append("{type=\"terminated_count\"} ")
                .append(threads.getThreadsTerminatedCount()).append("\n");
        return;
    }

    @Override
    public void visit(String prefix, @SuppressWarnings("rawtypes") Metric metric) {
        // title
        final String fullName = prefix + metric.getName();
        if (!metricNames.contains(fullName)) {
            sb.append(HELP).append(fullName).append(" ").append(metric.getDescription()).append("\n");
            sb.append(TYPE).append(fullName).append(" ").append(metric.getType().name().toLowerCase()).append("\n");
            metricNames.add(fullName);
        }
        sb.append(fullName);

        // name
        @SuppressWarnings("unchecked")
        List<MetricLabel> labels = metric.getLabels();
        if (!labels.isEmpty()) {
            sb.append("{");
            List<String> labelStrs = labels.stream().map(l -> l.getKey() + "=\"" + l.getValue()
                    + "\"").collect(Collectors.toList());
            sb.append(Joiner.on(", ").join(labelStrs));
            sb.append("}");
        }

        // value
        sb.append(" ").append(metric.getValue().toString()).append("\n");
    }

    @Override
    public void visitHistogram(String prefix, String name, Histogram histogram) {
        // part.part.part.k1=v1.k2=v2
        List<String> names = new ArrayList<>();
        List<String> tags = new ArrayList<>();
        for (String part : name.split("\\.")) {
            String[] kv = part.split("=");
            if (kv.length == 1) {
                names.add(kv[0]);
            } else if (kv.length == 2) {
                tags.add(String.format("%s=\"%s\"", kv[0], kv[1]));
            }
        }
        final String fullName = prefix + String.join("_", names);
        final String fullTag = String.join(",", tags);
        // we should define metric name only once
        if (!metricNames.contains(fullName)) {
            sb.append(HELP).append(fullName).append(" ").append("\n");
            sb.append(TYPE).append(fullName).append(" ").append("summary\n");
            metricNames.add(fullName);
        }
        String delimiter = tags.isEmpty() ? "" : ",";
        Snapshot snapshot = histogram.getSnapshot();
        sb.append(fullName).append("{quantile=\"0.75\"").append(delimiter).append(fullTag).append("} ")
            .append(snapshot.get75thPercentile()).append("\n");
        sb.append(fullName).append("{quantile=\"0.95\"").append(delimiter).append(fullTag).append("} ")
            .append(snapshot.get95thPercentile()).append("\n");
        sb.append(fullName).append("{quantile=\"0.98\"").append(delimiter).append(fullTag).append("} ")
            .append(snapshot.get98thPercentile()).append("\n");
        sb.append(fullName).append("{quantile=\"0.99\"").append(delimiter).append(fullTag).append("} ")
            .append(snapshot.get99thPercentile()).append("\n");
        sb.append(fullName).append("{quantile=\"0.999\"").append(delimiter).append(fullTag).append("} ")
            .append(snapshot.get999thPercentile()).append("\n");
        sb.append(fullName).append("_sum{").append(fullTag).append("} ")
                .append(histogram.getCount() * snapshot.getMean()).append("\n");
        sb.append(fullName).append("_count{").append(fullTag).append("} ")
                .append(histogram.getCount()).append("\n");
    }

    @Override
    public void visitNodeInfo() {
        final String NODE_INFO = "node_info";
        sb.append(Joiner.on(" ").join(TYPE, NODE_INFO, "gauge\n"));
        sb.append(NODE_INFO).append("{type=\"fe_node_num\", state=\"total\"} ")
                .append(Env.getCurrentEnv().getFrontends(null).size()).append("\n");
        sb.append(NODE_INFO).append("{type=\"be_node_num\", state=\"total\"} ")
                .append(Env.getCurrentSystemInfo().getAllBackendIds(false).size()).append("\n");
        sb.append(NODE_INFO).append("{type=\"be_node_num\", state=\"alive\"} ")
                .append(Env.getCurrentSystemInfo().getAllBackendIds(true).size()).append("\n");
        sb.append(NODE_INFO).append("{type=\"be_node_num\", state=\"decommissioned\"} ")
                .append(Env.getCurrentSystemInfo().getDecommissionedBackendIds().size()).append("\n");
        sb.append(NODE_INFO).append("{type=\"broker_node_num\", state=\"dead\"} ").append(
                Env.getCurrentEnv().getBrokerMgr().getAllBrokers()
                        .stream().filter(b -> !b.isAlive).count()).append("\n");

        // only master FE has this metrics, to help the Grafana knows who is the master
        if (Env.getCurrentEnv().isMaster()) {
            sb.append(NODE_INFO).append("{type=\"is_master\"} ").append(1).append("\n");
        }
        return;
    }

    @Override
    public void visitCloudTableStats() {
        if (Config.isNotCloudMode() || Env.getCurrentEnv().getTabletStatMgr() == null) {
            return;
        }

        CloudTabletStatMgr tabletStatMgr = (CloudTabletStatMgr) Env.getCurrentEnv().getTabletStatMgr();

        StringBuilder dataSizeBuilder = new StringBuilder();
        StringBuilder rowsetCountBuilder = new StringBuilder();
        StringBuilder segmentCountBuilder = new StringBuilder();
        StringBuilder tableRowCountBuilder = new StringBuilder();

        Collection<OlapTable.Statistics> values = tabletStatMgr.getCloudTableStatsMap().values();
        // calc totalTableSize
        long totalTableSize = 0;
        for (OlapTable.Statistics stats : values) {
            totalTableSize += stats.getDataSize();
        }
        // output top N metrics
        if (values.size() > Config.prom_output_table_metrics_limit) {
            // only copy elements if number of tables > prom_output_table_metrics_limit
            PriorityQueue<OlapTable.Statistics> topStats = new PriorityQueue<>(
                    Config.prom_output_table_metrics_limit,
                    Comparator.comparingLong(OlapTable.Statistics::getDataSize));
            for (OlapTable.Statistics stats : values) {
                if (topStats.size() < Config.prom_output_table_metrics_limit) {
                    topStats.offer(stats);
                } else if (!topStats.isEmpty()
                        && stats.getDataSize() > topStats.peek().getDataSize()) {
                    topStats.poll();
                    topStats.offer(stats);
                }
            }
            values = topStats;
        }
        for (OlapTable.Statistics stats : values) {

            dataSizeBuilder.append("doris_fe_table_data_size{db_name=\"");
            dataSizeBuilder.append(stats.getDbName());
            dataSizeBuilder.append("\", table_name=\"");
            dataSizeBuilder.append(stats.getTableName());
            dataSizeBuilder.append("\"} ");
            dataSizeBuilder.append(stats.getDataSize());
            dataSizeBuilder.append("\n");

            rowsetCountBuilder.append("doris_fe_table_rowset_count{db_name=\"");
            rowsetCountBuilder.append(stats.getDbName());
            rowsetCountBuilder.append("\", table_name=\"");
            rowsetCountBuilder.append(stats.getTableName());
            rowsetCountBuilder.append("\"} ");
            rowsetCountBuilder.append(stats.getRowsetCount());
            rowsetCountBuilder.append("\n");

            segmentCountBuilder.append("doris_fe_table_segment_count{db_name=\"");
            segmentCountBuilder.append(stats.getDbName());
            segmentCountBuilder.append("\", table_name=\"");
            segmentCountBuilder.append(stats.getTableName());
            segmentCountBuilder.append("\"} ");
            segmentCountBuilder.append(stats.getSegmentCount());
            segmentCountBuilder.append("\n");

            tableRowCountBuilder.append("doris_fe_table_row_count{db_name=\"");
            tableRowCountBuilder.append(stats.getDbName());
            tableRowCountBuilder.append("\", table_name=\"");
            tableRowCountBuilder.append(stats.getTableName());
            tableRowCountBuilder.append("\"} ");
            tableRowCountBuilder.append(stats.getRowCount());
            tableRowCountBuilder.append("\n");
        }

        if (dataSizeBuilder.length() > 0) {
            sb.append(Joiner.on(" ").join(HELP, "doris_fe_table_data_size", "table data size\n"));
            sb.append(Joiner.on(" ").join(TYPE, "doris_fe_table_data_size", "gauge\n"));
            sb.append(dataSizeBuilder.toString());
        }

        if (segmentCountBuilder.length() > 0) {
            sb.append(Joiner.on(" ").join(HELP, "doris_fe_table_rowset_count", "table rowset count\n"));
            sb.append(Joiner.on(" ").join(TYPE, "doris_fe_table_rowset_count", "gauge\n"));
            sb.append(rowsetCountBuilder.toString());
        }

        if (segmentCountBuilder.length() > 0) {
            sb.append(Joiner.on(" ").join(HELP, "doris_fe_table_segment_count", "table segment count\n"));
            sb.append(Joiner.on(" ").join(TYPE, "doris_fe_table_segment_count", "gauge\n"));
            sb.append(segmentCountBuilder.toString());
        }

        if (tableRowCountBuilder.length() > 0) {
            sb.append(Joiner.on(" ").join(HELP, "doris_fe_table_row_count", "table row count\n"));
            sb.append(Joiner.on(" ").join(TYPE, "doris_fe_table_row_count", "gauge\n"));
            sb.append(tableRowCountBuilder.toString());
        }

        // total table size
        sb.append(Joiner.on(" ").join(HELP, "doris_fe_table_data_size_total", "total table data size\n"));
        sb.append(Joiner.on(" ").join(TYPE, "doris_fe_table_data_size_total", "gauge\n"));
        sb.append("doris_fe_table_data_size_total ");
        sb.append(totalTableSize);
        sb.append("\n");

        // total recycle bin size
        long totalRecycleSize = 0;
        for (Map.Entry<Long, Pair<Long, Long>> entry : Env.getCurrentRecycleBin().getDbToRecycleSize().entrySet()) {
            totalRecycleSize += entry.getValue().first;
        }
        sb.append(Joiner.on(" ").join(HELP, "doris_fe_recycle_data_size_total", "total recycle bin data size\n"));
        sb.append(Joiner.on(" ").join(TYPE, "doris_fe_recycle_data_size_total", "gauge\n"));
        sb.append("doris_fe_recycle_data_size_total ");
        sb.append(totalRecycleSize);
        sb.append("\n");
        return;
    }

    @Override
    public void visitWorkloadGroup() {
        StringBuilder tmpSb = new StringBuilder();
        try {
            String counterTitle = "doris_workload_group_query_detail";
            tmpSb.append("# HELP " + counterTitle + "\n");
            tmpSb.append("# TYPE " + counterTitle + " counter\n");
            Map<String, List<String>> workloadGroupMap = Env.getCurrentEnv().getWorkloadGroupMgr()
                    .getWorkloadGroupQueryDetail();
            for (Map.Entry<String, List<String>> entry : workloadGroupMap.entrySet()) {
                String name = entry.getKey();
                List<String> valList = entry.getValue();
                tmpSb.append(String.format("%s{name=\"%s\", type=\"%s\"} %s\n", counterTitle, name, "running_query_num",
                        valList.get(0)));
                tmpSb.append(String.format("%s{name=\"%s\", type=\"%s\"} %s\n", counterTitle, name, "waiting_query_num",
                        valList.get(1)));
            }
            sb.append(tmpSb);
        } catch (Exception e) {
            logger.warn("error happends when get workload group query detail ", e);
        }
    }
}
